{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "provenance": [],
      "toc_visible": true,
      "authorship_tag": "ABX9TyN4D6iVLtKHQm9QRmuNNcC4"
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "cells": [
    {
      "cell_type": "code",
      "source": [
        "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License"
      ],
      "metadata": {
        "cellView": "form",
        "id": "IA9uYREbI3m3"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Anomaly Detection with Ensemble Models using Apache Beam (Isolation Forest and LOF)\n",
        "\n",
        "<table align=\"left\">\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_iforest.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\" />Run in Google Colab</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_iforest.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\" />View source on GitHub</a>\n",
        "  </td>\n",
        "</table>"
      ],
      "metadata": {
        "id": "Eef1VqflIrXW"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "This notebook demonstrates how to perform anomaly detection on streaming data using the AnomalyDetection PTransform, a new feature introduced in Apache Beam 2.64.0 with more improvement on offline model support in 2.65.0.\n",
        "\n",
        "This notebook is divided into two main sections:\n",
        "- Running a Single Offline Model: We will first fetch the data set of Statlog (shuttle) from UCI Machine Learning Repository (cached in gs://apache-beam-samples/anomaly_detection/shuttle/, original link: https://archive.ics.uci.edu/dataset/148/statlog+shuttle). This data will be streamed into the pipeline following a periodic impulse. Our Beam pipeline will then apply the AnomalyDetection PTransform with the a pre-trained isolation forest model, compute model metrics.\n",
        "- Running an Ensemble of Models: Using the same training data, we now train a\n",
        "second offline model and"
      ],
      "metadata": {
        "id": "m0xhMRWfX07o"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Preparation"
      ],
      "metadata": {
        "id": "V1950SxGb0u2"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Setting Environment Variables"
      ],
      "metadata": {
        "id": "EKkFQE8iwT7M"
      }
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "L_18eUf7QU2I"
      },
      "outputs": [],
      "source": [
        "# GCP project id\n",
        "PROJECT_ID = 'apache-beam-testing'  # @param {type:'string'}\n",
        "\n",
        "# Temporary root path, used to store generated files (and temp and staging files if running on Dataflow)\n",
        "TEMP_ROOT = 'gs://apache-beam-testing-temp'  # @param {type:'string'}\n",
        "\n",
        "# Required if running on Dataflow\n",
        "REGION = 'us-central1'  # @param {type:'string'}\n",
        "\n",
        "BEAM_VERSION = '2.65.0'\n",
        "\n",
        "import random\n",
        "SUFFIX = str(random.randint(0, 10000))"
      ]
    },
    {
      "cell_type": "code",
      "source": [
        "import sys\n",
        "if 'google.colab' in sys.modules:\n",
        "  from google.colab import auth\n",
        "  auth.authenticate_user(project_id=PROJECT_ID)"
      ],
      "metadata": {
        "id": "A_49Y2aTQeiH"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Installing Beam and Other Dependencies"
      ],
      "metadata": {
        "id": "0WXpo4aDwG3N"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# For running with local prism runner\n",
        "!pip install 'apache_beam[interactive]=={BEAM_VERSION}' --quiet"
      ],
      "metadata": {
        "id": "5hpDAMOyQfHP"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# Install pyod for offline anomaly detectors\n",
        "!pip install pyod==2.0.3"
      ],
      "metadata": {
        "id": "KlkX-iwVm42J"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Part 1: Running an Offline Isolation Forest Model"
      ],
      "metadata": {
        "id": "pAoMoi0McEIH"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Model Training"
      ],
      "metadata": {
        "id": "UIzbsGtWto5X"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Download the sample data from GCS\n",
        "train_data_fn = \"./shuttle.trn\"\n",
        "! gcloud storage cp \"gs://apache-beam-samples/anomaly_detection/shuttle/shuttle.trn\" {train_data_fn}"
      ],
      "metadata": {
        "id": "vb6ubiSyipuG"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "import pandas as pd\n",
        "import pyod.models.iforest as iforest\n",
        "import pickle\n",
        "\n",
        "FIELD_NAMES = [\"time\", \"f1\", \"f2\", \"f3\", \"f4\", \"f5\", \"f6\", \"f7\", \"f8\", \"target\"]\n",
        "SEP = \" \"\n",
        "df = pd.read_csv(train_data_fn, sep=\" \", names=FIELD_NAMES)\n",
        "\n",
        "# We don't need the time and target field for training.\n",
        "train_data = df.drop(['time', 'target'], axis=1)\n",
        "train_data_np = train_data.to_numpy()\n",
        "\n",
        "# Training an isolation forest model\n",
        "my_iforest = iforest.IForest(random_state=1234)\n",
        "my_iforest.fit(train_data_np)\n",
        "\n",
        "# Save the model into a file\n",
        "iforest_pickled_fn = './iforest_pickled'\n",
        "with open(iforest_pickled_fn, 'wb') as fp:\n",
        "  pickle.dump(my_iforest, fp)"
      ],
      "metadata": {
        "id": "P9MzeokSiPxK"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# Upload the pickled model file to GCS\n",
        "PICKLED_PATH = TEMP_ROOT + '/anomaly/iforest-notebook-' + SUFFIX + '/pickled'\n",
        "iforest_pickled_fn_gcs = PICKLED_PATH + '/iforest.pickled'\n",
        "\n",
        "! gcloud storage cp {iforest_pickled_fn} {iforest_pickled_fn_gcs}"
      ],
      "metadata": {
        "id": "01JdmTuXuqY_"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Defining a Streaming Source and a DoFn for Model Metrics"
      ],
      "metadata": {
        "id": "q9HmkOY5vW-8"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "import math\n",
        "from typing import Any\n",
        "from collections.abc import Sequence\n",
        "\n",
        "import sklearn\n",
        "\n",
        "import apache_beam as beam\n",
        "from apache_beam.coders import PickleCoder\n",
        "from apache_beam.coders import VarIntCoder\n",
        "from apache_beam.transforms.periodicsequence import PeriodicImpulse\n",
        "from apache_beam.transforms.userstate import ReadModifyWriteStateSpec\n",
        "from apache_beam.transforms.window import FixedWindows\n",
        "from apache_beam.ml.anomaly.base import AnomalyResult\n",
        "\n",
        "class SequenceToPeriodicStream(beam.PTransform):\n",
        "  \"\"\" A streaming source that generate periodic event based on a given sequence. \"\"\"\n",
        "  def __init__(self, data:Sequence[Any], delay: float = 0.1, repeat: bool = True):\n",
        "    self._data = data\n",
        "    self._delay = delay\n",
        "    self._repeat = repeat\n",
        "\n",
        "  class EmitOne(beam.DoFn):\n",
        "    INDEX_SPEC = ReadModifyWriteStateSpec('index', VarIntCoder())\n",
        "\n",
        "    def __init__(self, data, repeat):\n",
        "      self._data = data\n",
        "      self._repeat = repeat\n",
        "      self._max_index = len(self._data)\n",
        "\n",
        "    def process(self, element, model_state=beam.DoFn.StateParam(INDEX_SPEC)):\n",
        "      index = model_state.read() or 0\n",
        "      if index >= self._max_index:\n",
        "        return\n",
        "\n",
        "      yield self._data[index]\n",
        "\n",
        "      index += 1\n",
        "      if self._repeat:\n",
        "        index %= self._max_index\n",
        "      model_state.write(index)\n",
        "\n",
        "  def expand(self, input):\n",
        "    return (input | PeriodicImpulse(fire_interval=self._delay)\n",
        "        | beam.Map(lambda x: (0, x))\n",
        "        | beam.ParDo(SequenceToPeriodicStream.EmitOne(self._data, self._repeat))\n",
        "        | beam.WindowInto(FixedWindows(self._delay)))\n",
        "\n",
        "\n",
        "class ComputeMetrics(beam.DoFn):\n",
        "    \"\"\" A DoFn to compute Area Under Curve (AUC) \"\"\"\n",
        "    METRIC_STATE_INDEX = ReadModifyWriteStateSpec('saved_tracker', PickleCoder())\n",
        "\n",
        "    def __init__(self, get_target):\n",
        "        self._underlying: tuple[list[float], list[int]]\n",
        "        self._get_target = get_target\n",
        "\n",
        "    def process(self,\n",
        "              element: tuple[Any, AnomalyResult],\n",
        "              metric_state=beam.DoFn.StateParam(METRIC_STATE_INDEX),\n",
        "              **kwargs):\n",
        "        self._underlying: tuple[list[float], list[int]] = metric_state.read()\n",
        "        if self._underlying is None:\n",
        "            scores = []\n",
        "            labels = []\n",
        "            targets = []\n",
        "            self._underlying = (scores, labels, targets)\n",
        "        else:\n",
        "            scores, labels, targets = self._underlying\n",
        "\n",
        "        prediction = next(iter(element[1].predictions))\n",
        "        if math.isnan(prediction.score):\n",
        "            yield element[0], beam.Row()\n",
        "        else:\n",
        "            # buffer the scores and targets for auc computation\n",
        "            scores.append(prediction.score)\n",
        "            labels.append(prediction.label)\n",
        "            targets.append(self._get_target(element[1].example))\n",
        "\n",
        "            accuracy = sklearn.metrics.accuracy_score(targets, labels)\n",
        "            recall = sklearn.metrics.recall_score(targets, labels)\n",
        "            precision = sklearn.metrics.precision_score(targets, labels)\n",
        "            f1 = sklearn.metrics.f1_score(targets, labels)\n",
        "            fpr, tpr, _ = sklearn.metrics.roc_curve(targets, scores)\n",
        "            auc = sklearn.metrics.auc(fpr, tpr)\n",
        "\n",
        "            yield element[0], beam.Row(id=element[1].example.id,\n",
        "                                       target=element[1].example.target,\n",
        "                                       predicted_label=next(iter(element[1].predictions)).label,\n",
        "                                       predicted_score=next(iter(element[1].predictions)).score,\n",
        "                                       accuracy=float(accuracy),\n",
        "                                       recall=float(recall),\n",
        "                                       precision=float(precision),\n",
        "                                       f1=float(f1),\n",
        "                                       auc=float(auc))\n",
        "\n",
        "        metric_state.write(self._underlying)"
      ],
      "metadata": {
        "id": "4WgxpmAPQrbv"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Preparing Test Data for Streaming"
      ],
      "metadata": {
        "id": "OGRa4fbKxpr0"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Download the data from GCS\n",
        "test_data_fn = \"./test.trn\"\n",
        "! gcloud storage cp \"gs://apache-beam-samples/anomaly_detection/shuttle/shuttle.tst\" {test_data_fn}"
      ],
      "metadata": {
        "id": "fnidxXybxuFD"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "from apache_beam.io.filesystems import FileSystems\n",
        "import pandas as pd\n",
        "\n",
        "FIELD_NAMES = [\"time\", \"f1\", \"f2\", \"f3\", \"f4\", \"f5\", \"f6\", \"f7\", \"f8\", \"target\"]\n",
        "SEP = \" \"\n",
        "with FileSystems().open(test_data_fn) as f:\n",
        "  df = pd.read_csv(f, sep=\" \", names=FIELD_NAMES)\n",
        "  # just use first 500 instances for demo\n",
        "  df = df[:500]\n",
        "  rows = [row.to_dict() for _, row in df.iterrows()]\n",
        "  for i, row in enumerate(rows):\n",
        "    row[\"id\"] = i\n",
        "\n",
        "# Dropping time and target for testing\n",
        "test_data_cols = FIELD_NAMES.copy()\n",
        "test_data_cols.remove(\"time\")\n",
        "test_data_cols.remove(\"target\")"
      ],
      "metadata": {
        "id": "iwkrY8oXSAlQ"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Constructing the Pipeline and Running with Prism"
      ],
      "metadata": {
        "id": "GKFwC-ky0XZ5"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from apache_beam.ml.anomaly.detectors.pyod_adapter import PyODFactory\n",
        "\n",
        "# Create detector for PyOd model pickled file\n",
        "detector = PyODFactory.create_detector(iforest_pickled_fn_gcs, features=test_data_cols)"
      ],
      "metadata": {
        "id": "jP_M9pY2mp1C"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "import apache_beam as beam\n",
        "from apache_beam.options.pipeline_options import PipelineOptions\n",
        "from apache_beam.ml.anomaly.transforms import AnomalyDetection\n",
        "from apache_beam.transforms.window import GlobalWindows\n",
        "from apache_beam.io import fileio\n",
        "\n",
        "import logging\n",
        "logging.getLogger().setLevel(logging.INFO)\n",
        "\n",
        "# Running the pipeline on prism\n",
        "options = PipelineOptions([\n",
        "    \"--streaming\",\n",
        "    \"--job_server_timeout=600\",\n",
        "    \"--environment_type=LOOPBACK\",\n",
        "    \"--runner=PrismRunner\",\n",
        "])\n",
        "\n",
        "with beam.Pipeline(options=options) as p:\n",
        "  _ = (p\n",
        "       | SequenceToPeriodicStream(rows, delay=1, repeat=True)\n",
        "       | beam.Map(lambda x: beam.Row(**x))\n",
        "       | beam.WithKeys(0)\n",
        "       | AnomalyDetection(detector=detector)\n",
        "       | beam.WindowInto(GlobalWindows()) # put everything into global window to compute overall auc\n",
        "       | beam.ParDo(ComputeMetrics(lambda x: 1 if x.target != 1 else 0))\n",
        "       | beam.LogElements()\n",
        "  )"
      ],
      "metadata": {
        "id": "ghFoWPAeS6SH"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Alternative: Running the Pipeline with Dataflow Runner"
      ],
      "metadata": {
        "id": "I-6hza8-DcY5"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Environment Variables for Dataflow Runner\n",
        "TEMP_LOCATION = TEMP_ROOT + '/anomaly/iforest-notebook-' + SUFFIX + '/temp'\n",
        "STAGING_LOCATION = TEMP_ROOT + '/anomaly/iforest-notebook-' + SUFFIX + '/staging'"
      ],
      "metadata": {
        "id": "Gh-bU6vsD9vD"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# For running with dataflow runner\n",
        "!pip install 'apache_beam[interactive,gcp]=={BEAM_VERSION}' --quiet"
      ],
      "metadata": {
        "id": "0C0qur71DiN3"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# Running the pipeline on dataflow\n",
        "options = PipelineOptions([\n",
        "  \"--runner=DataflowRunner\",\n",
        "  \"--temp_location=\" + TEMP_LOCATION,\n",
        "  \"--staging_location=\" + STAGING_LOCATION,\n",
        "  \"--project=\" + PROJECT_ID,\n",
        "  \"--region=\" + REGION,\n",
        "  \"--extra_package=gs://apache-beam-samples/anomaly_detection/pyod/pyod-2.0.3.tar.gz\",\n",
        "])\n",
        "\n",
        "with beam.Pipeline(options=options) as p:\n",
        "  _ = (p\n",
        "       | SequenceToPeriodicStream(rows, delay=1, repeat=True)\n",
        "       | beam.Map(lambda x: beam.Row(**x))\n",
        "       | beam.WithKeys(0)\n",
        "       | AnomalyDetection(detector=detector)\n",
        "       | beam.WindowInto(GlobalWindows()) # put everything into global window to compute overall auc\n",
        "       | beam.ParDo(ComputeMetrics(lambda x: 1 if x.target != 1 else 0))\n",
        "       | beam.LogElements()\n",
        "  )"
      ],
      "metadata": {
        "id": "-uspd0osDocV"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Part 2: Running an Ensemble of Models"
      ],
      "metadata": {
        "id": "xCkGdyxheB0J"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Another Model Training"
      ],
      "metadata": {
        "id": "zolNal0V01Na"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from pyod.models.loda import LODA\n",
        "\n",
        "my_lof = LODA()\n",
        "my_lof.fit(train_data_np)"
      ],
      "metadata": {
        "id": "KkyuDIVoePnv"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# Save the model into a file\n",
        "lof_pickled_fn = './lof_pickled'\n",
        "with open(lof_pickled_fn, 'wb') as fp:\n",
        "  pickle.dump(my_lof, fp)\n",
        "\n",
        "# Write to GCS\n",
        "lof_pickled_fn_gcs = PICKLED_PATH + '/lof.pickled'\n",
        "\n",
        "! gcloud storage cp {lof_pickled_fn} {lof_pickled_fn_gcs}"
      ],
      "metadata": {
        "id": "x_lEnelMj1fD"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# Create detector for PyOd model pickled file\n",
        "detector1 = PyODFactory.create_detector(iforest_pickled_fn_gcs, features=test_data_cols, model_id=\"iforest\")\n",
        "detector2 = PyODFactory.create_detector(lof_pickled_fn_gcs, features=test_data_cols, model_id=\"lof\")"
      ],
      "metadata": {
        "id": "N5w0K5B1jnIm"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Run an Ensemble Anomaly Detector"
      ],
      "metadata": {
        "id": "u733O9kN1D40"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from apache_beam.ml.anomaly.base import EnsembleAnomalyDetector\n",
        "from apache_beam.ml.anomaly.aggregations import AnyVote\n",
        "\n",
        "# Running the pipeline on prism\n",
        "options = PipelineOptions([\n",
        "    \"--streaming\",\n",
        "    \"--job_server_timeout=600\",\n",
        "    \"--environment_type=LOOPBACK\",\n",
        "    \"--runner=PrismRunner\",\n",
        "])\n",
        "\n",
        "with beam.Pipeline(options=options) as p:\n",
        "  _ = (p\n",
        "       | SequenceToPeriodicStream(rows, delay=1, repeat=True)\n",
        "       | beam.Map(lambda x: beam.Row(**x))\n",
        "       | beam.WithKeys(0)\n",
        "       | AnomalyDetection(detector=EnsembleAnomalyDetector([detector1, detector2],\n",
        "                                                           aggregation_strategy=AnyVote()))\n",
        "       | beam.LogElements()\n",
        "  )"
      ],
      "metadata": {
        "id": "gWRplG09kXmh"
      },
      "execution_count": null,
      "outputs": []
    }
  ]
}